3.框架详细介绍

您所在的位置:网站首页 python 消息队列框架 3.框架详细介绍

3.框架详细介绍

2023-10-05 01:07| 来源: 网络整理| 查看: 265

3.4 框架的乞丐精简版实现方式

由于框架的功能十分多,如果没学习36种设计模式,就很难看懂源码,现在演示精简实现原理

此精简例子十分之简单明了,就是死循环从中间件取任务然后丢到线程池里面执行。

此代码在 funboost/beggar_version_implementation/beggar_redis_consumer.py

这样简单明了,演示了基本原理,但是这个缺少消费确认(随意重启代码会造成大量任务丢失) qps恒定等20种功能。

def start_consuming_message(queue_name, consume_function, threads_num=50): pool = ThreadPoolExecutor(threads_num) while True: try: redis_task = redis.brpop(queue_name, timeout=60) if redis_task: task_str = redis_task[1].decode() print(f'从redis的 {queue_name} 队列中 取出的消息是: {task_str}') pool.submit(consume_function, **json.loads(task_str)) else: print(f'redis的 {queue_name} 队列中没有任务') except redis.RedisError as e: print(e) if __name__ == '__main__': import time def add(x, y): time.sleep(5) print(f'{x} + {y} 的结果是 {x + y}') # 推送任务 for i in range(100): print(i) redis.lpush('test_beggar_redis_consumer_queue', json.dumps(dict(x=i, y=i * 2))) start_consuming_message('test_beggar_redis_consumer_queue', consume_function=add, threads_num=10)


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3